subject <- raveio::RAVESubject$new(
  project_name = project_name, 
  subject_code = subject_code, 
  strict = FALSE
)
imported_electrodes <- subject$electrodes[
  subject$preprocess_settings$data_imported
]

if(!length(imported_electrodes)){
  stop("The subject exists but its signal has not been imported yet.")
}

lb <- unlist(notch_filter_lowerbound)
ub <- unlist(notch_filter_upperbound)
if(length(lb) != length(ub)) {
  stop(sprintf("Notch filter lower bound length should match with the upper bound length (%d vs %d)", length(lb), length(ub)))
}

if(length(lb)) {
  
  if(!all(lb < ub)) {
    sel <- lb >= ub
    lb <- lb[sel]
    ub <- ub[sel]
    stop("Notch filter lower bounds must be uniformly smaller than the upper bounds: (", paste0(lb, ">", ub, collapse = ", "), ")")
  }
  
}

filter_settings <- list(
  lb = lb,
  ub = ub,
  domain = 1
)

blocks <- subject$preprocess_settings$blocks
electrodes <- imported_electrodes
filters <- filter_settings

fmt <- file.path(subject$preprocess_path, "voltage", "electrode_%d.h5")
sample_rates <- subject$raw_sample_rates
sample_rates <- sapply(electrodes, function(e){
  sample_rates[subject$electrodes == e]
})

dipsaus::lapply_async2(
  seq_along(electrodes), function(ii){
    e <- electrodes[[ii]]
    srate <- sample_rates[[ii]]
    h5_path <- sprintf(fmt, e)
    h5_names <- gsub("^/", "", raveio::h5_names(h5_path))
    sel <- sprintf("raw/%s", blocks) %in% h5_names
    if(!all(sel)) {
      stop(sprintf(
        "Cannot find imported block(s): %s (electrode %s)",
        blocks[!sel], e
      ))
    }
    for(block in blocks){
      cat(block, e, "\n")
      signal <- raveio::load_h5(h5_path, sprintf("raw/%s", block), ram = TRUE)
      signal <- ravetools::notch_filter(s = signal, sample_rate = srate, lb = filters$lb, ub = filters$ub, domain = filters$domain)
      raveio::save_h5(x = signal, file = h5_path, name = sprintf("notch/%s", block), chunk = 1024, replace = TRUE, ctype = "numeric")
    }
  }, plan = FALSE, callback = function(ii){
    sprintf("Applying Notch filters|Electrode - %s", electrodes[[ii]])
  }
)
#> 008 13 
#> 010 13 
#> 011 13 
#> 012 13 
#> 008 14 
#> 010 14 
#> 011 14 
#> 012 14 
#> 008 15 
#> 010 15 
#> 011 15 
#> 012 15 
#> 008 16 
#> 010 16 
#> 011 16 
#> 012 16 
#> 008 24 
#> 010 24 
#> 011 24 
#> 012 24
#> [[1]]
#> NULL
#> 
#> [[2]]
#> NULL
#> 
#> [[3]]
#> NULL
#> 
#> [[4]]
#> NULL
#> 
#> [[5]]
#> NULL

# save to subject
preproc <- raveio::RAVEPreprocessSettings$new(subject = subject$subject_id)

for(e in electrodes){
  preproc$data[[as.character(e)]]$notch_filtered <- TRUE
}
preproc$save()

apply_notch <- list(
  electrodes = electrodes,
  notch_filter_lowerbound = filters$lb,
  notch_filter_upperbound = filters$ub,
  timestamp = strftime(Sys.time(), usetz = TRUE)
)

subject$set_default(
  namespace = "notch_filter",
  key = "parameters",
  value = apply_notch
)

diagnostic_plots <- FALSE
params <- as.list(diagnostic_plot_params)

background <- params$background
foreground <- params$foreground

if(!length(background) == 1) {
  background <- "white"
}
if(!length(foreground) == 1) {
  foreground <- "black"
}


if(length(params$path) == 1) {
  
  grDevices::pdf(file = params$path, width = 12, height = 7, onefile = TRUE, bg = background, fg = foreground, useDingbats = FALSE)
  on.exit({
    grDevices::dev.off()
  }, add = TRUE)
}

winlen <- as.numeric(params$window_length)
if(!length(winlen) || is.na(winlen)) {
  winlen <- "auto"
}
  
max_freq <- as.numeric(params$max_frequency)
if(!length(max_freq) || is.na(max_freq)) {
  max_freq <- 300
}
nbins <- as.numeric(params$histogram_bins)
if(!length(nbins) || is.na(nbins)) {
  nbins <- 50
}


font_size <- as.numeric(params$font_size)
if(!length(font_size) || is.na(font_size)) {
  font_size <- 2
}
quiet <- isTRUE(params$quiet)


diagnostic_plots <- diagnose_notch_filters(
  subject = subject,
  electrodes = imported_electrodes,
  max_freq = max_freq,
  winlen = winlen,
  nbins = nbins,
  bg = background,
  fg = foreground,
  cex = font_size,
  std = 3,
  lwd = 0.3,
  quiet = quiet
)
#> [Generating diagnostic plots]: initializing... 
[Generating diagnostic plots]: Electrode 13 (start) (1 out of 10) 

#> [Generating diagnostic plots]: Electrode 13 (end) (2 out of 10) 
[Generating diagnostic plots]: Electrode 14 (start) (3 out of 10) 

#> [Generating diagnostic plots]: Electrode 14 (end) (4 out of 10) 
[Generating diagnostic plots]: Electrode 15 (start) (5 out of 10) 

#> [Generating diagnostic plots]: Electrode 15 (end) (6 out of 10) 
[Generating diagnostic plots]: Electrode 16 (start) (7 out of 10) 

#> [Generating diagnostic plots]: Electrode 16 (end) (8 out of 10) 
[Generating diagnostic plots]: Electrode 24 (start) (9 out of 10) 

#> [Generating diagnostic plots]: Electrode 24 (end) (10 out of 10) 

Build, Visualize, & Run

Please make sure the following code block is at the end of your pipeline file. This block will build the pipeline and generate a make-notch_filter.R script with your pipeline markdown file. RAVE will use the generated pipeline script to execute the pipeline in the dashboard application, or in massive production mode.

Once the pipeline script make-notch_filter.R is built, you can visualize and execute the pipeline without the need of re-knit this document. Notice we use r block instead of rave. (This is because the code blocks are not part of pipeline targets.)

Sys.setenv("RAVE_PIPELINE" = normalizePath("."))
raveio::pipeline_run(type = "vanilla")
#> • start target settings_path
#> • built target settings_path
#> • start target settings
#> • built target settings
#> • start target notch_filter_lowerbound
#> • built target notch_filter_lowerbound
#> • start target notch_filter_upperbound
#> • built target notch_filter_upperbound
#> • start target diagnostic_plot_params
#> • built target diagnostic_plot_params
#> • start target subject_code
#> • built target subject_code
#> • start target project_name
#> • built target project_name
#> • start target filter_settings
#> • built target filter_settings
#> • start target subject
#> • built target subject
#> • start target imported_electrodes
#> • built target imported_electrodes
#> • start target diagnostic_plots
#> [Generating diagnostic plots]: initializing... 
[Generating diagnostic plots]: Electrode 13 (start) (1 out of 10) 
#> [Generating diagnostic plots]: Electrode 13 (end) (2 out of 10) 
[Generating diagnostic plots]: Electrode 14 (start) (3 out of 10) 
#> [Generating diagnostic plots]: Electrode 14 (end) (4 out of 10) 
[Generating diagnostic plots]: Electrode 15 (start) (5 out of 10) 
#> [Generating diagnostic plots]: Electrode 15 (end) (6 out of 10) 
[Generating diagnostic plots]: Electrode 16 (start) (7 out of 10) 
#> [Generating diagnostic plots]: Electrode 16 (end) (8 out of 10) 
[Generating diagnostic plots]: Electrode 24 (start) (9 out of 10) 
#> [Generating diagnostic plots]: Electrode 24 (end) (10 out of 10) 
#> • built target diagnostic_plots
#> • start target apply_notch
#> 008 13 
#> 010 13 
#> 011 13 
#> 012 13 
#> 008 14 
#> 010 14 
#> 011 14 
#> 012 14 
#> 008 15 
#> 010 15 
#> 011 15 
#> 012 15 
#> 008 16 
#> 010 16 
#> 011 16 
#> 012 16 
#> 008 24 
#> 010 24 
#> 011 24 
#> 012 24
#> • built target apply_notch
#> • end pipeline: 19.886 seconds
#> <Pipeline result container> 
#> process: native
#> status: running (0 of 12)
raveio::pipeline_progress(method = 'details')
#> # A tibble: 12 × 2
#>    name                    progress
#>    <chr>                   <chr>   
#>  1 settings_path           built   
#>  2 settings                built   
#>  3 notch_filter_lowerbound built   
#>  4 notch_filter_upperbound built   
#>  5 diagnostic_plot_params  built   
#>  6 subject_code            built   
#>  7 project_name            built   
#>  8 filter_settings         built   
#>  9 subject                 built   
#> 10 imported_electrodes     built   
#> 11 diagnostic_plots        built   
#> 12 apply_notch             built